Coverage Report

Created: 2026-03-18 12:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
D:\a\scloud-dns\scloud-dns\src\workers\mod.rs
Line
Count
Source
1
use crate::exceptions::SCloudException;
2
use crate::workers::manager::StartGate;
3
use crate::workers::task::InFlightTask;
4
use crate::{log_error, log_info, log_sdebug, log_strace};
5
use anyhow::Result;
6
use serde::{Deserialize, Serialize};
7
use std::sync::Arc;
8
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering};
9
use tokio::sync::{Mutex, MutexGuard, Semaphore, mpsc};
10
11
pub(crate) mod manager;
12
pub(crate) mod queue;
13
pub(crate) mod task;
14
pub(crate) mod tests;
15
pub(crate) mod types;
16
17
#[allow(non_camel_case_types)]
18
#[derive(Debug)]
19
pub(crate) struct SCloudWorker {
20
    // IDENTITY
21
    pub(crate) worker_id: AtomicU64,
22
    pub(crate) worker_type: AtomicU8,
23
24
    // CHANNEL
25
    pub(crate) dns_tx: Mutex<Vec<mpsc::Sender<InFlightTask>>>,
26
    pub(crate) dns_rx: Mutex<Vec<mpsc::Receiver<InFlightTask>>>,
27
28
    // RESOURCES/LIMITS
29
    pub(crate) stack_size_bytes: AtomicUsize,
30
    pub(crate) buffer_budget_bytes: AtomicUsize,
31
    pub(crate) max_stack_size_bytes: AtomicUsize,
32
    pub(crate) max_buffer_budget_bytes: AtomicUsize,
33
34
    // RUNTIME STATE
35
    pub(crate) state: AtomicU8,
36
    pub(crate) shutdown_requested: AtomicBool,
37
    pub(crate) shutdown_mode: AtomicU8,
38
39
    // BACKPRESSURE/IN-FLIGHT
40
    pub(crate) in_flight: AtomicUsize, // for metrics
41
    pub(crate) in_flight_sem: Arc<Semaphore>,
42
    pub(crate) max_in_flight: AtomicUsize,
43
44
    // METRICS
45
    pub(crate) jobs_done: AtomicU64,
46
    pub(crate) jobs_failed: AtomicU64,
47
    pub(crate) jobs_retried: AtomicU64,
48
49
    pub(crate) last_job_started_ms: AtomicU64,
50
    pub(crate) last_job_finished_ms: AtomicU64,
51
52
    pub(crate) last_error_code: AtomicU64,
53
    pub(crate) last_error_at_ms: AtomicU64,
54
55
    // CORRELATION/TRACING
56
    pub(crate) last_task_id_hi: AtomicU64, // 128-bit UUID split
57
    pub(crate) last_task_id_lo: AtomicU64,
58
}
59
60
impl SCloudWorker {
61
    const NEVER_APPLIED: u8 = 0xFF;
62
63
69
    pub(crate) fn new(worker_type: WorkerType) -> Result<Self, SCloudException> {
64
69
        Ok(Self {
65
69
            worker_id: AtomicU64::new(manager::generate_worker_id()),
66
69
            worker_type: AtomicU8::new(worker_type as u8),
67
69
            dns_tx: Mutex::new(Vec::new()),
68
69
            dns_rx: Mutex::new(Vec::new()),
69
69
            stack_size_bytes: AtomicUsize::new(2 * 1024 * 1024),
70
69
            buffer_budget_bytes: AtomicUsize::new(4 * 1024 * 1024),
71
69
            max_stack_size_bytes: AtomicUsize::new(32 * 1024 * 1024),
72
69
            max_buffer_budget_bytes: AtomicUsize::new(256 * 1024 * 1024),
73
69
            state: AtomicU8::new(WorkerState::INIT as u8),
74
69
            shutdown_requested: AtomicBool::new(false),
75
69
            shutdown_mode: AtomicU8::new(ShutdownMode::GRACEFUL as u8),
76
69
            in_flight: AtomicUsize::new(0),
77
69
            in_flight_sem: Arc::new(Semaphore::new(512)),
78
69
            max_in_flight: AtomicUsize::new(512),
79
69
            jobs_done: AtomicU64::new(0),
80
69
            jobs_failed: AtomicU64::new(0),
81
69
            jobs_retried: AtomicU64::new(0),
82
69
            last_job_started_ms: AtomicU64::new(0),
83
69
            last_job_finished_ms: AtomicU64::new(0),
84
69
            last_error_code: AtomicU64::new(0),
85
69
            last_error_at_ms: AtomicU64::new(0),
86
69
            last_task_id_hi: AtomicU64::new(0),
87
69
            last_task_id_lo: AtomicU64::new(0),
88
69
        })
89
69
    }
90
91
18
    pub async fn run(self: Arc<Self>, gate: Option<Arc<StartGate>>) -> Result<(), SCloudException> {
92
18
        log_sdebug!(
93
            "Running SCloudWorker [ID: {}][TYPE: {:?}]",
94
18
            self.get_worker_id(),
95
18
            self.get_worker_type()
96
        );
97
98
18
        if let Some(g) = gate {
99
18
            g.done().await;
100
0
        }
101
18
        match WorkerType::try_from(self.worker_type.load(Ordering::Relaxed)).unwrap() {
102
            WorkerType::LISTENER => {
103
2
                return Err(SCloudException::SCLOUD_WORKER_LISTENER_NO_SOCKET);
104
            }
105
            WorkerType::DECODER => {
106
2
                self.clone().set_state(WorkerState::IDLE);
107
2
                let (
rx0
,
tx0
) = self.get_dns_rx_tx().await?;
108
0
                types::decoder::run_dns_decoder(self.clone(), rx, tx).await?;
109
            }
110
            WorkerType::QUERY_DISPATCHER => {
111
2
                self.clone().set_state(WorkerState::IDLE);
112
2
                let (
rx0
,
tx0
) = self.get_dns_rx_tx().await?;
113
0
                types::query_dispatcher::run_dns_query_dispatcher(self.clone(), rx, tx).await?;
114
            }
115
            WorkerType::CACHE_LOOKUP => {
116
2
                self.clone().set_state(WorkerState::IDLE);
117
2
                let (
rx0
,
tx0
) = self.get_dns_rx_tx().await?;
118
0
                types::cache_lookup::run_dns_cache_lookup(self.clone(), rx, tx).await?;
119
            }
120
            WorkerType::ZONE_MANAGER => {
121
2
                self.clone().set_state(WorkerState::IDLE);
122
2
                let (
rx0
,
tx0
) = self.get_dns_rx_tx().await?;
123
0
                types::zone_manager::run_dns_zone_manager(self.clone(), rx, tx).await?;
124
            }
125
            WorkerType::RESOLVER => {
126
2
                self.clone().set_state(WorkerState::IDLE);
127
2
                let (
rx0
,
tx0
) = self.get_dns_rx_tx().await?;
128
0
                types::resolver::run_dns_resolver(self.clone(), rx, tx).await?;
129
            }
130
            WorkerType::CACHE_WRITER => {
131
2
                self.clone().set_state(WorkerState::IDLE);
132
2
                let (
rx0
,
tx0
) = self.get_dns_rx_tx().await?;
133
0
                types::cache_writer::run_dns_cache_writer(self.clone(), rx, tx).await?;
134
            }
135
            WorkerType::ENCODER => {
136
2
                self.clone().set_state(WorkerState::IDLE);
137
2
                let (
rx0
,
tx0
) = self.get_dns_rx_tx().await?;
138
0
                types::encoder::run_dns_encoder(self.clone(), rx, tx).await?;
139
            }
140
            WorkerType::SENDER => {
141
1
                self.clone().set_state(WorkerState::IDLE);
142
1
                let 
rx0
= self.get_dns_rx().await?;
143
0
                types::sender::run_dns_sender(self.clone(), rx).await?;
144
            }
145
            WorkerType::CACHE_JANITOR => {
146
0
                self.clone().set_state(WorkerState::IDLE);
147
0
                types::cache_janitor::run_dns_cache_janitor(self.clone()).await?;
148
            }
149
            WorkerType::METRICS => {
150
0
                self.clone().set_state(WorkerState::IDLE);
151
0
                types::metrics::start_otlp_logger().await;
152
            }
153
            WorkerType::TCP_ACCEPTOR => {
154
1
                self.clone().set_state(WorkerState::IDLE);
155
1
                let 
tx0
= self.get_dns_tx().await?;
156
0
                types::tcp_acceptor::run_dns_tcp_acceptor(self.clone(), tx).await?;
157
            }
158
0
            _ => {}
159
        }
160
0
        Ok(())
161
18
    }
162
163
    #[inline]
164
41
    pub fn get_worker_id(&self) -> u64 {
165
41
        self.worker_id.load(Ordering::Relaxed)
166
41
    }
167
168
    #[inline]
169
50
    pub fn get_worker_type(&self) -> WorkerType {
170
50
        WorkerType::try_from(self.worker_type.load(Ordering::Relaxed)).unwrap()
171
50
    }
172
173
    #[inline]
174
0
    pub async fn push_dns_rx(&self, rx: mpsc::Receiver<InFlightTask>) {
175
0
        self.dns_rx.lock().await.push(rx);
176
0
    }
177
178
    #[inline]
179
0
    pub async fn push_dns_tx_many(&self, txs: Vec<mpsc::Sender<InFlightTask>>) {
180
0
        self.dns_tx.lock().await.extend(txs);
181
0
    }
182
183
    #[inline]
184
15
    pub async fn get_dns_rx_tx(
185
15
        &self,
186
15
    ) -> Result<
187
15
        (
188
15
            Vec<mpsc::Receiver<InFlightTask>>,
189
15
            Vec<mpsc::Sender<InFlightTask>>,
190
15
        ),
191
15
        SCloudException,
192
15
    > {
193
15
        Ok((self.get_dns_rx().await
?7
,
self8
.get_dns_tx().await
?7
))
194
15
    }
195
196
    #[inline]
197
10
    pub async fn get_dns_tx(&self) -> Result<Vec<mpsc::Sender<InFlightTask>>, SCloudException> {
198
10
        let mut guard = self.dns_tx.lock().await;
199
10
        if guard.is_empty() {
200
8
            return Err(SCloudException::SCLOUD_WORKER_TX_NOT_SET);
201
2
        }
202
2
        Ok(std::mem::take(&mut *guard))
203
10
    }
204
205
    #[inline]
206
17
    pub async fn get_dns_rx(&self) -> Result<Vec<mpsc::Receiver<InFlightTask>>, SCloudException> {
207
17
        let mut guard = self.dns_rx.lock().await;
208
17
        if guard.is_empty() {
209
8
            return Err(SCloudException::SCLOUD_WORKER_RX_NOT_SET);
210
9
        }
211
9
        Ok(std::mem::take(&mut *guard))
212
17
    }
213
214
    #[inline]
215
2
    pub fn get_stack_size_bytes(&self) -> usize {
216
2
        self.stack_size_bytes.load(Ordering::Relaxed)
217
2
    }
218
219
    #[inline]
220
2
    pub fn get_buffer_budget_bytes(&self) -> usize {
221
2
        self.buffer_budget_bytes.load(Ordering::Relaxed)
222
2
    }
223
224
    #[inline]
225
2
    pub fn get_max_stack_size_bytes(&self) -> usize {
226
2
        self.max_stack_size_bytes.load(Ordering::Relaxed)
227
2
    }
228
229
    #[inline]
230
2
    pub fn get_max_buffer_budget_bytes(&self) -> usize {
231
2
        self.max_buffer_budget_bytes.load(Ordering::Relaxed)
232
2
    }
233
234
    #[inline]
235
7
    pub fn get_state(&self) -> u8 {
236
7
        self.state.load(Ordering::Acquire)
237
7
    }
238
239
    #[inline]
240
2
    pub fn get_shutdown_requested(&self) -> bool {
241
2
        self.shutdown_requested.load(Ordering::Acquire)
242
2
    }
243
244
    #[inline]
245
3
    pub fn get_shutdown_mode(&self) -> u8 {
246
3
        self.shutdown_mode.load(Ordering::Acquire)
247
3
    }
248
249
    #[inline]
250
2
    pub fn get_in_flight(&self) -> usize {
251
2
        self.in_flight.load(Ordering::Relaxed)
252
2
    }
253
254
    #[inline]
255
1
    pub fn get_in_flight_sem(&self) -> usize {
256
1
        self.in_flight_sem.available_permits()
257
1
    }
258
259
    #[inline]
260
3
    pub fn get_max_in_flight(&self) -> usize {
261
3
        self.max_in_flight.load(Ordering::Relaxed)
262
3
    }
263
264
    #[inline]
265
2
    pub fn get_jobs_done(&self) -> u64 {
266
2
        self.jobs_done.load(Ordering::Relaxed)
267
2
    }
268
269
    #[inline]
270
2
    pub fn get_jobs_failed(&self) -> u64 {
271
2
        self.jobs_failed.load(Ordering::Relaxed)
272
2
    }
273
274
    #[inline]
275
2
    pub fn get_jobs_retried(&self) -> u64 {
276
2
        self.jobs_retried.load(Ordering::Relaxed)
277
2
    }
278
279
    #[inline]
280
2
    pub fn get_last_job_started_ms(&self) -> u64 {
281
2
        self.last_job_started_ms.load(Ordering::Relaxed)
282
2
    }
283
284
    #[inline]
285
2
    pub fn get_last_job_finished_ms(&self) -> u64 {
286
2
        self.last_job_finished_ms.load(Ordering::Relaxed)
287
2
    }
288
289
    #[inline]
290
2
    pub fn get_last_error_code(&self) -> u64 {
291
2
        self.last_error_code.load(Ordering::Relaxed)
292
2
    }
293
294
    #[inline]
295
2
    pub fn get_last_error_at_ms(&self) -> u64 {
296
2
        self.last_error_at_ms.load(Ordering::Relaxed)
297
2
    }
298
299
    #[inline]
300
2
    pub fn get_last_task_id_hi(&self) -> u64 {
301
2
        self.last_task_id_hi.load(Ordering::Relaxed)
302
2
    }
303
304
    #[inline]
305
2
    pub fn get_last_task_id_lo(&self) -> u64 {
306
2
        self.last_task_id_lo.load(Ordering::Relaxed)
307
2
    }
308
309
    #[inline]
310
1
    pub fn set_worker_id(&self, worker_id: u64) {
311
1
        self.worker_id.store(worker_id, Ordering::Relaxed);
312
1
    }
313
314
    #[inline]
315
13
    pub fn set_worker_type(&self, worker_type: WorkerType) {
316
13
        self.worker_type.store(worker_type as u8, Ordering::Relaxed);
317
13
    }
318
319
    #[inline]
320
1
    pub async fn set_dns_tx(&self, tx: mpsc::Sender<InFlightTask>) {
321
1
        self.dns_tx.lock().await.push(tx);
322
1
    }
323
324
    #[inline]
325
1
    pub async fn set_dns_rx(&self, rx: mpsc::Receiver<InFlightTask>) {
326
1
        self.dns_rx.lock().await.push(rx);
327
1
    }
328
329
    #[inline]
330
1
    pub fn set_stack_size_bytes(&self, stack_size_bytes: usize) {
331
1
        self.stack_size_bytes
332
1
            .store(stack_size_bytes, Ordering::Relaxed);
333
1
    }
334
335
    #[inline]
336
1
    pub fn set_buffer_budget_bytes(&self, buffer_budget_bytes: usize) {
337
1
        self.buffer_budget_bytes
338
1
            .store(buffer_budget_bytes, Ordering::Relaxed);
339
1
    }
340
341
    #[inline]
342
1
    pub fn set_max_stack_size_bytes(&self, max_stack_size_bytes: usize) {
343
1
        self.max_stack_size_bytes
344
1
            .store(max_stack_size_bytes, Ordering::Relaxed);
345
1
    }
346
347
    #[inline]
348
1
    pub fn set_max_buffer_budget_bytes(&self, max_buffer_budget_bytes: usize) {
349
1
        self.max_buffer_budget_bytes
350
1
            .store(max_buffer_budget_bytes, Ordering::Relaxed);
351
1
    }
352
353
    #[inline]
354
24
    pub fn set_state(&self, state: WorkerState) {
355
24
        self.state.store(state as u8, Ordering::Relaxed);
356
24
    }
357
358
    #[inline]
359
1
    pub fn set_shutdown_requested(&self, shutdown_requested: bool) {
360
1
        self.shutdown_requested
361
1
            .store(shutdown_requested, Ordering::Relaxed);
362
1
    }
363
364
    #[inline]
365
2
    pub fn set_shutdown_mode(&self, shutdown_mode: ShutdownMode) {
366
2
        self.shutdown_mode
367
2
            .store(shutdown_mode as u8, Ordering::Relaxed);
368
2
    }
369
370
    #[inline]
371
1
    pub fn set_in_flight(&self, in_flight: usize) {
372
1
        self.in_flight.store(in_flight, Ordering::Relaxed);
373
1
    }
374
375
    #[inline]
376
5
    pub fn set_max_in_flight(&self, max_in_flight: usize) {
377
5
        self.max_in_flight.store(max_in_flight, Ordering::Relaxed);
378
5
    }
379
380
    #[inline]
381
1
    pub fn set_jobs_done(&self, jobs_done: u64) {
382
1
        self.jobs_done.store(jobs_done, Ordering::Relaxed);
383
1
    }
384
385
    #[inline]
386
1
    pub fn set_jobs_failed(&self, jobs_failed: u64) {
387
1
        self.jobs_failed.store(jobs_failed, Ordering::Relaxed);
388
1
    }
389
390
    #[inline]
391
1
    pub fn set_jobs_retried(&self, jobs_retried: u64) {
392
1
        self.jobs_retried.store(jobs_retried, Ordering::Relaxed);
393
1
    }
394
395
    #[inline]
396
1
    pub fn set_last_job_started_ms(&self, last_job_started_ms: u64) {
397
1
        self.last_job_started_ms
398
1
            .store(last_job_started_ms, Ordering::Relaxed);
399
1
    }
400
401
    #[inline]
402
1
    pub fn set_last_job_finished_ms(&self, last_job_finished_ms: u64) {
403
1
        self.last_job_finished_ms
404
1
            .store(last_job_finished_ms, Ordering::Relaxed);
405
1
    }
406
407
    #[inline]
408
1
    pub fn set_last_error_code(&self, last_error_code: u64) {
409
1
        self.last_error_code
410
1
            .store(last_error_code, Ordering::Relaxed);
411
1
    }
412
413
    #[inline]
414
1
    pub fn set_last_error_at_ms(&self, last_error_at_ms: u64) {
415
1
        self.last_error_at_ms
416
1
            .store(last_error_at_ms, Ordering::Relaxed);
417
1
    }
418
419
    #[inline]
420
1
    pub fn set_last_task_id_hi(&self, last_task_id_hi: u64) {
421
1
        self.last_task_id_hi
422
1
            .store(last_task_id_hi, Ordering::Relaxed);
423
1
    }
424
425
    #[inline]
426
1
    pub fn set_last_task_id_lo(&self, last_task_id_lo: u64) {
427
1
        self.last_task_id_lo
428
1
            .store(last_task_id_lo, Ordering::Relaxed);
429
1
    }
430
}
431
432
#[repr(u8)]
433
#[allow(unused)]
434
#[allow(non_camel_case_types)]
435
#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, Eq)]
436
pub enum WorkerType {
437
    NONE = 99,
438
    LISTENER = 0,
439
    DECODER = 1,
440
    QUERY_DISPATCHER = 2,
441
    CACHE_LOOKUP = 3,
442
    ZONE_MANAGER = 4,
443
    RESOLVER = 5,
444
    CACHE_WRITER = 6,
445
    ENCODER = 7,
446
    SENDER = 8,
447
448
    CACHE_JANITOR = 9,
449
450
    METRICS = 10,
451
    TCP_ACCEPTOR = 11,
452
}
453
454
impl TryFrom<u8> for WorkerType {
455
    type Error = ();
456
457
69
    fn try_from(v: u8) -> Result<Self, Self::Error> {
458
69
        Ok(match v {
459
7
            0 => WorkerType::LISTENER,
460
7
            1 => WorkerType::DECODER,
461
7
            2 => WorkerType::QUERY_DISPATCHER,
462
7
            3 => WorkerType::CACHE_LOOKUP,
463
7
            4 => WorkerType::ZONE_MANAGER,
464
7
            5 => WorkerType::RESOLVER,
465
7
            6 => WorkerType::CACHE_WRITER,
466
7
            7 => WorkerType::ENCODER,
467
4
            8 => WorkerType::SENDER,
468
1
            9 => WorkerType::CACHE_JANITOR,
469
1
            10 => WorkerType::METRICS,
470
5
            11 => WorkerType::TCP_ACCEPTOR,
471
2
            99 => WorkerType::NONE,
472
            // TODO: return an SCloudException
473
0
            _ => return Err(()),
474
        })
475
69
    }
476
}
477
478
#[repr(u8)]
479
#[allow(unused)]
480
#[allow(non_camel_case_types)]
481
#[derive(Debug, PartialEq)]
482
pub(crate) enum WorkerState {
483
    INIT = 0,
484
    IDLE = 1,
485
    BUSY = 2,
486
    PAUSED = 3,
487
    STOPPING = 4,
488
    STOPPED = 5,
489
}
490
491
impl TryFrom<u8> for WorkerState {
492
    type Error = ();
493
494
6
    fn try_from(v: u8) -> Result<Self, Self::Error> {
495
6
        Ok(match v {
496
1
            0 => WorkerState::INIT,
497
1
            1 => WorkerState::IDLE,
498
1
            2 => WorkerState::BUSY,
499
1
            3 => WorkerState::PAUSED,
500
1
            4 => WorkerState::STOPPING,
501
1
            5 => WorkerState::STOPPED,
502
            // TODO: return an SCloudException
503
0
            _ => return Err(()),
504
        })
505
6
    }
506
}
507
508
#[repr(u8)]
509
#[allow(unused)]
510
#[allow(non_camel_case_types)]
511
#[derive(Debug, PartialEq)]
512
pub(crate) enum ShutdownMode {
513
    GRACEFUL = 0,
514
    IMMEDIATE = 1,
515
}
516
517
impl TryFrom<u8> for ShutdownMode {
518
    type Error = ();
519
520
2
    fn try_from(v: u8) -> Result<Self, Self::Error> {
521
2
        Ok(match v {
522
1
            0 => ShutdownMode::GRACEFUL,
523
1
            1 => ShutdownMode::IMMEDIATE,
524
            // TODO: return an SCloudException
525
0
            _ => return Err(()),
526
        })
527
2
    }
528
}
529
530
0
pub fn spawn_worker(
531
0
    worker: Arc<SCloudWorker>,
532
0
    gate: Arc<StartGate>,
533
0
) -> tokio::task::JoinHandle<()> {
534
0
    tokio::spawn(async move {
535
0
        gate.wait_turn(worker.get_worker_id()).await;
536
537
0
        if let Err(e) = worker.clone().run(Some(gate.clone())).await {
538
0
            log_error!("Worker {} failed: {:?}", worker.get_worker_id(), e);
539
0
        }
540
541
0
        gate.done().await;
542
0
    })
543
0
}